WeNet 更新:超大规模数据 UIO,支持千万小时语音训练任务
旧有 IO 方案的问题
内存溢出(OOM)问题:在超大规模数据时,一般机器的物理内存已经难以一次性加载训练数据的索引信息。 读取性能慢:旧有方案为随机读取,在超大规模数据内存无法做文件 cache 的情况下,训练数据读取速度大幅度降低,从而导致训练速度慢。
UIO 解决方案
webdataset: https://github.com/webdataset/webdataset tfrecord:Tensorflow 的 IO 解决方案
内存中仅需维护压缩包的的索引信息,从而大大节省了内存,解决了 OOM 的问题。 读取时在内存中进行 on-the-fly 的解压缩,同一个压缩包内的数据顺序读取,解决了随机读取性能慢的问题。不同的压缩包可以进行随机读取,保证了数据的全局随机性。
其中:
Small IO 为小数据集支持,我们称之为
raw
模式,该模式仅支持本地文件读取。所需文件须整理成 Kaldi 风格的语音列表文件 wav.scp 和标注列表文件 text。Big IO 为大数据集支持,我们称之为
shard
模式,该模式既可以支持本地文件读取,也可以支持网络云存储文件的读取。所需文件须整理成压缩包形式,单个压缩包内顺序存储了音频(wav)和其标注(txt)。
TFRecord 中的链式 IO 示例
def read_dataset(filename, batch_size):
dataset = tf.data.TFRecordDataset(filename)
dataset = dataset.map(_parse_image_function, num_parallel_calls=tf.data.experimental.AUTOTUNE)
dataset = dataset.shuffle(500)
dataset = dataset.batch(batch_size, drop_remainder=True)
dataset = dataset.repeat()
dataset = dataset.prefetch(buffer_size=tf.data.experimental.AUTOTUNE)
return dataset
tokenize 模块: 将标注解析成建模单元 char 或者 BPE。 filter 模块:过滤掉训练数据中音频过长过短、标注过长过短的句子。 resample 模块: 对训练数据进行可选的重采样。 compute_fbank 模块:特征提取 spec_augmentation 模块:对特征进行 spec_augmentation 增强。 shuffle 模块:对数据进行局部 shuffle。 sort 模块:对数据进行局部排序。 batch 模块:将多条数据组织成 batch。 padding 模块:对同一 batch 内的数据进行 padding 处理。
实验
AIshell
raw
还是 shard
的模式,均能取得与之前旧有方案相当的识别率。WenetSpeech
shard
方案训练。WeNet 和 ESPnet 使用相近的模型结构和参数配置,两者取得相近的识别率,说明了 UIO 方案在大规模数据上的正确性。在训练过程中,我们观测到 UIO 的 GPU 整体的利用率在 80% ~ 90% 以上,说明整体 IO 的读取效率很高。如何使用?
train_data(cv_data/test_data): data.list data_type: 可以为raw或者shard symbol_table: 训练时的建模单元字典文件
python wenet/bin/train.py --gpu $gpu_id \
--config $train_config \
--data_type $data_type \
--symbol_table $dict \
--train_data $feat_dir/$train_set/data.list \
--cv_data $feat_dir/dev/data.list \
...
raw
的文件,data.list 的形式如下所示,每一行为一行 json 序列化的字符串,该 json 串中含有 key, wav, txt 三个字段。{"key": "BAC009S0002W0122", "wav": "/export/data/asr-data/OpenSLR/33//data_aishell/wav/train/S0002/BAC009S0002W0122.wav", "txt": "而对楼市成交抑制作用最大的限购"}
{"key": "BAC009S0002W0123", "wav": "/export/data/asr-data/OpenSLR/33//data_aishell/wav/train/S0002/BAC009S0002W0123.wav", "txt": "也成为地方政府的眼中钉"}
{"key": "BAC009S0002W0124", "wav": "/export/data/asr-data/OpenSLR/33//data_aishell/wav/train/S0002/BAC009S0002W0124.wav", "txt": "自六月底呼和浩特市率先宣布取消限购后"}
shard
的文件,data.list 中每一行为一个统一资源标识符 URI 的地址,该地址可以为本地文件,或者网络文件 HTTP/HTTPS/FTP 等形式。通过网络接口形式,我们即可实现对 S3/OSS/HDFS 等分布式存储系统的支持。例如: 对于本地文件 data.list 的形式为:/export/maryland/binbinzhang/code/wenet/examples/aishell/s3/raw_wav/train/shards/shards_000000000.tar.gz
/export/maryland/binbinzhang/code/wenet/examples/aishell/s3/raw_wav/train/shards/shards_000000001.tar.gz
/export/maryland/binbinzhang/code/wenet/examples/aishell/s3/raw_wav/train/shards/shards_000000002.tar.gz
https://examplebucket.oss-cn-hangzhou.aliyuncs.com/exampledir/1.tar.gz
https://examplebucket.oss-cn-hangzhou.aliyuncs.com/exampledir/2.tar.gz
实现细节
如何做数据的 Distributed Partition? 根据rank 和 num_workers 对数据进行分割即可,如以下代码所示:
class DistributedSampler:
def __init__(self, shuffle=True, partition=True):
self.epoch = -1
self.update()
self.shuffle = shuffle
self.partition = partition
def set_epoch(self, epoch):
self.epoch = epoch
def sample(self, data):
data = list(range(len(data)))
if self.partition:
if self.shuffle:
random.Random(self.epoch).shuffle(data)
data = data[self.rank::self.world_size]
data = data[self.worker_id::self.num_workers]
return data
如何处理不平衡的数据?
在训练时每个 rank 上分配到的数据不一定完全一样多,即使分配到一样多的数据,在后续处理中也会有数据过滤,也会导致每个 rank 上分配的数据不一样多。使用 model.join() 处理每个rank上分配到的数据不均衡, 详请参考 https://pytorch.org/tutorials/advanced/generic_join.html#how-does-join-work
Buffer size, 目前的设置中有两处 buffer。
Shuffle buffer:用来 shuffle 数据,这个 buffer 设置的大小建议大于单个 shards 中含有的数据数量,每次相当于数据在两个 shards 间做 shuffle,增大了数据的随机性。例如每个 shard 中含1000 条语音时,可以设置 shuffle_buffer 为 1500. Sort buffer: 用来对数据按语音帧数排序,这个操作很重要,能大大提高训练速度。 当然,两个 buffer 均不建议设置的过大,设置过大时,一方面比较占内存,二是程序可能卡在读满一个 buffer 这一步上。
Prefetch
Pytorch Dataloader 中使用 prefetch 来预读数据,prefetch的粒度是最终训练的batch,默认参数为2,也就是默认会预读两个 batch 的数据。在新 IO 的设计中,因为有前置 buffer 的存在,预读的数据可能已经在 buffer 中,从而没有去做真正的预读,等到下次训练时,buffer 中的数据不足,才会 on the fly 的再去填充 buffer,这时训练即 block 在了读数据上。简言之,就是在prefetch 很小的时候训练在部分时间会 block 在读数据上,因为前面某一级还在缓存数据。所以要设置比较大的 prefetch 来避免这个问题,例如:
shards: 1000 条 buffer: 1000 条 batch_size: 32 pretch: 100 相当于预读了 32 * 100,大约是3个shards的内容。
读者潜在的问题
为什么不直接使用 TFRecord?
TFRecord 是为 Tensorflow 专门设计的一个重量级的库,使用 protobuf 进行数据组织,protobuf 格式数据可读性差。Pytorch 中也缺乏相应的 TFRecord 的生态。
为什么不直接使用 webdataset?
WeNet 中 UIO 的设计理念和方式和 webdataset 非常相近。但从目前现有的实现来看,webdataset 是基于图像数据设计的,在语音任务中使用的话,仍需做大量的修改。并且,我们希望在小数据集时,依然使用原始的语音文件,webdataset 中也不支持。
为什么使用 LMDB, HDF5 等 key/value 的数据库?
这些数据库能够解决大量小文件的问题,但依然不能解决文件随机读取的问题,也不支持 Cloud IO。
合作推广稿件投稿 | 项目推广 | 合作咨询
探讨更多合作内容,加联系人微信
永久福利 直投简历
(简历投递):yuyinzatan@163.com
语音杂谈内推助力,leader直收简历
企业招聘旺季,推荐机会不容错过
觉得本篇文章不错?
扫码关注我们
语音人的技术客栈
专注于语音技术分享与干货推送